/*     */ package com.pl.client;
/*     */ 
/*     */ import cn.hutool.core.util.CharsetUtil;
/*     */ import cn.hutool.core.util.StrUtil;
/*     */ import com.fasterxml.jackson.core.JsonProcessingException;
/*     */ import com.fasterxml.jackson.databind.ObjectMapper;
/*     */ import java.util.Collections;
/*     */ import java.util.concurrent.ExecutionException;
/*     */ import java.util.concurrent.atomic.AtomicBoolean;
/*     */ import javax.annotation.PostConstruct;
/*     */ import com.pl.configuration.OpsProperties;
import com.pl.event.*;
import com.pl.util.ObjectMapperUtil;
import org.apache.kafka.clients.admin.AdminClient;
/*     */ import org.apache.kafka.clients.admin.CreateTopicsResult;
/*     */ import org.apache.kafka.clients.admin.DescribeTopicsResult;
/*     */ import org.apache.kafka.clients.admin.NewTopic;
/*     */ import org.apache.kafka.clients.admin.TopicDescription;
/*     */ import org.apache.kafka.common.KafkaFuture;
/*     */ import org.apache.kafka.common.serialization.ByteArraySerializer;
/*     */ import org.apache.kafka.common.serialization.StringSerializer;
/*     */ import org.slf4j.Logger;
/*     */ import org.slf4j.LoggerFactory;
/*     */ import org.springframework.beans.factory.annotation.Autowired;
/*     */ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
/*     */ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
/*     */ import org.springframework.core.env.Environment;
/*     */ import org.springframework.kafka.core.KafkaTemplate;
/*     */ import org.springframework.kafka.support.SendResult;
/*     */ import org.springframework.stereotype.Service;
/*     */ import org.springframework.util.concurrent.ListenableFuture;
/*     */ 
/*     */ @ConditionalOnProperty(name = {"mos.ops.client-type"}, havingValue = "mq", matchIfMissing = true)
/*     */ @Service
/*     */ public class KafkaOpsClient
/*     */   implements IOpsClient {
/*  41 */   private Logger log = LoggerFactory.getLogger(KafkaOpsClient.class);
/*     */   
/*     */   @Autowired
/*     */   private KafkaTemplate<String, Object> kafkaTemplate;
/*     */   
/*     */   @Autowired
/*     */   private OpsProperties opsProperties;
/*     */   
/*     */   @Autowired
/*     */   private KafkaProperties kafkaProperties;
/*     */   
/*  52 */   private ObjectMapper mapper = ObjectMapperUtil.createObjectMapper();
/*     */ 
/*     */   
/*     */   @Autowired
/*     */   private Environment env;
/*     */ 
/*     */   
/*     */   private String topic;
/*     */ 
/*     */   
/*     */   private static final int PARTITION_NUMBER = 16;
/*     */ 
/*     */   
/*     */   private static final short REPLICATION_FACTOR = 1;
/*     */ 
/*     */ 
/*     */   
/*     */   @PostConstruct
/*     */   private void init() throws ExecutionException, InterruptedException {
/*  71 */     AdminClient adminClient = AdminClient.create(this.kafkaProperties.buildAdminProperties());
/*  72 */     AtomicBoolean result = new AtomicBoolean(true);
/*     */     
/*  74 */     this.topic = this.opsProperties.getMqTopic();
/*     */     
/*  76 */     if (StrUtil.isBlank(this.topic)) {
/*  77 */       this.topic = this.env.getProperty("project", "mos") + "-ops";
/*     */     }
/*     */     
/*  80 */     this.log.info("check topic {}", this.topic);
/*  81 */     DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(this.topic));
/*  82 */     KafkaFuture<TopicDescription> future = (KafkaFuture<TopicDescription>)describeTopicsResult.values().get(this.topic);
/*     */     try {
/*  84 */       TopicDescription topicDescription = (TopicDescription)future.get();
/*  85 */       this.log.info("topic info {}", topicDescription);
/*     */     }
/*  87 */     catch (Exception e) {
/*  88 */       if (e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
/*  89 */         NewTopic newTopic = new NewTopic(this.topic, 16, (short)1);
/*  90 */         CreateTopicsResult topics = adminClient.createTopics(Collections.singletonList(newTopic));
/*  91 */         KafkaFuture<Void> createFuture = (KafkaFuture<Void>)topics.values().get(this.topic);
/*  92 */         createFuture.whenComplete((v, t) -> {
/*     */               if (t == null) {
/*     */                 this.log.info("create topic {} successful", this.topic);
/*     */                 result.set(true);
/*     */               } else {
/*     */                 this.log.info("create topic {} failure", this.topic);
/*     */                 result.set(false);
/*     */               } 
/*     */             });
/* 101 */         createFuture.get();
/*     */       } else {
/* 103 */         this.log.warn("check topic error", e);
/*     */       } 
/*     */     } 
/*     */     
/* 107 */     result.get();
/*     */   }
/*     */ 
/*     */   
/*     */   public void sendMethodOpsEvent(MethodOpsEvent event) throws JsonProcessingException {
/* 112 */     sendEvent((AbstractOpsEvent)event);
/*     */   }
/*     */ 
/*     */   
/*     */   public void sendBizLogOpsEvent(BizLogOpsEvent event) throws JsonProcessingException {
/* 117 */     sendEvent((AbstractOpsEvent)event);
/*     */   }
/*     */ 
/*     */   
/*     */   public void sendMethodInvokeLogOpsEvent(MethodInvokeLogOpsEvent event) throws JsonProcessingException {
/* 122 */     sendEvent((AbstractOpsEvent)event);
/*     */   }
/*     */ 
/*     */   
/*     */   public void sendExtApiLogOpsEvent(ExtApiLogOpsEvent event) throws JsonProcessingException {
/* 127 */     sendEvent((AbstractOpsEvent)event);
/*     */   }
/*     */   
/*     */   private void sendEvent(AbstractOpsEvent event) throws JsonProcessingException {
/* 131 */     Object msg = null;
/* 132 */     if (this.kafkaProperties.getProducer().getKeySerializer().isAssignableFrom(ByteArraySerializer.class)) {
/* 133 */       msg = this.mapper.writeValueAsString(event).getBytes(CharsetUtil.CHARSET_UTF_8);
/* 134 */     } else if (this.kafkaProperties.getProducer().getKeySerializer().isAssignableFrom(StringSerializer.class)) {
/* 135 */       msg = this.mapper.writeValueAsString(event);
/*     */     } 
/* 137 */     if (msg != null) {
/*     */       
/* 139 */       ListenableFuture<SendResult<String, Object>> sendResult = this.kafkaTemplate.send(this.topic, msg);
/* 140 */       sendResult.completable().whenComplete((v, t) -> {
/*     */             if (t != null) {
/*     */               this.log.warn("message: {}", event);
/*     */               this.log.warn("send message error", t);
/*     */             } 
/*     */           });
/*     */     } else {
/* 147 */       this.log.warn("unsupported producer key serializer: " + this.kafkaProperties.getProducer().getKeySerializer().getName());
/*     */     } 
/*     */   }
/*     */ }


/* Location:              E:\code\common\cmsr-common-ops-log\1.0.0-SNAPSHOT\cmsr-common-ops-log-1.0.0-SNAPSHOT.jar!\com\cmsr\common\ops\log\client\KafkaOpsClient.class
 * Java compiler version: 8 (52.0)
 * JD-Core Version:       1.1.3
 */